Welcome to the stream analytics tutorial for EpiData. In this tutorial we will perform near real-time stream analytics on sample weather data acquired from a simulated wireless sensor network.
As a first step, we will import packages and modules required for this tutorial. Since EpiData Context (ec) is required to use the application, it is implicitly imported. Sample functions for near real-time analytics are avaialable in EpiData Analytics package. Other packages and modules, such as datetime, pandas and matplotlib, can also be imported at this time.
In [ ]:
#from epidata.context import ec
from epidata.analytics import *
%matplotlib inline
from datetime import datetime, timedelta
import pandas as pd
import time
import pylab as pl
from IPython import display
import json
EpiData supports development and deployment of custom algorithms via Jupyter Notebook. Below, we define python functions for substituting extreme outliers and aggregating temperature measurements. These functions can be operated on near real-time and historic data. In this tutorial, we will apply the functions on near real-time data available from Kafka 'measurements' and 'measurements_cleansed' topics
In [ ]:
import pandas as pd
import math, numbers
def substitute_demo(df, meas_names, method="rolling", size=3):
"""
Substitute missing measurement values within a data frame, using the specified method.
"""
df["meas_value"].replace(250, np.nan, inplace=True)
for meas_name in meas_names:
if (method == "rolling"):
if ((size % 2 == 0) and (size != 0)): size += 1
if df.loc[df["meas_name"]==meas_name].size > 0:
indices = df.loc[df["meas_name"] == meas_name].index[df.loc[df["meas_name"] == meas_name]["meas_value"].apply(
lambda x: not isinstance(x, basestring) and (x == None or np.isnan(x)))]
substitutes = df.loc[df["meas_name"]==meas_name]["meas_value"].rolling( window=size, min_periods=1, center=True).mean()
df["meas_value"].fillna(substitutes, inplace=True)
df.loc[indices, "meas_flag"] = "substituted"
df.loc[indices, "meas_method"] = "rolling average"
else:
raise ValueError("Unsupported substitution method: ", repr(method))
return df
In [ ]:
import pandas as pd
import numpy as np
import json
def subgroup_statistics(row):
row['start_time'] = np.min(row["ts"])
row["stop_time"] = np.max(row["ts"])
row["meas_summary_name"] = "statistics"
row["meas_summary_value"] = json.dumps({'count': row["meas_value"].count(), 'mean': row["meas_value"].mean(),
'std': row["meas_value"].std(), 'min': row["meas_value"].min(),
'max': row["meas_value"].max()})
row["meas_summary_description"] = "descriptive statistics"
return row
def meas_statistics_demo(df, meas_names, method="standard"):
"""
Compute statistics on measurement values within a data frame, using the specified method.
"""
if (method == "standard"):
df_grouped = df.loc[df["meas_name"].isin(meas_names)].groupby(["company", "site", "station", "sensor"],
as_index=False)
df_summary = df_grouped.apply(subgroup_statistics).loc[:, ["company", "site", "station", "sensor",
"start_time", "stop_time", "event", "meas_name", "meas_summary_name", "meas_summary_value",
"meas_summary_description"]].drop_duplicates()
else:
raise ValueError("Unsupported summary method: ", repr(method))
return df_summary
The analytics algorithms are executed on near real-time data through transformations. A transformation specifies the function, its parameters and destination. The destination can be one of the database tables, namely 'measurements_cleansed' or 'measurements_summary', or another Kafka topic.
Once the transformations are defined, they are initiated via ec.create_stream(transformations, data_source, batch_duration) function call.
In [ ]:
#Stop current near real-time processing
ec.stop_streaming()
In [ ]:
# Define tranformations and steam operations
op1 = ec.create_transformation(substitute_demo, [["Temperature", "Wind_Speed"], "rolling", 3], "measurements_substituted")
ec.create_stream([op1], "measurements")
op2 = ec.create_transformation(identity, [], "measurements_cleansed")
op3 = ec.create_transformation(meas_statistics, [["Temperature", "Wind_Speed"], "standard"], "measurements_summary")
ec.create_stream([op2, op3],"measurements_substituted")
# Start near real-time processing
ec.start_streaming()
We can now start data ingestion from simulated wireless sensor network. To do so, you can download and run the sensor_data_with_outliers.py example shown in the image below.
We query the original and processed data from Kafka queue using Kafka Consumer. The data obtained from the quey is visualized using Bokeh charts.
In [ ]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.layouts import row, column
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from kafka import KafkaConsumer
import json
from pandas.io.json import json_normalize
output_notebook()
In [ ]:
plot1 = figure(plot_width=750, plot_height=200, x_axis_type='datetime', y_range=(30, 300))
plot2 = figure(plot_width=750, plot_height=200, x_axis_type='datetime', y_range=(30, 300))
df_kafka_init = pd.DataFrame(columns = ["ts", "meas_value"])
test_data_1 = ColumnDataSource(data=df_kafka_init.to_dict(orient='list'))
test_data_2 = ColumnDataSource(data=df_kafka_init.to_dict(orient='list'))
meas_name = "Temperature"
plot1.circle("ts", "meas_value", source=test_data_1, legend=meas_name, line_color='orangered', line_width=1.5)
line1 = plot1.line("ts", "meas_value", source=test_data_1, legend=meas_name, line_color='orangered', line_width=1.5)
plot1.legend.location = "top_right"
plot2.circle("ts", "meas_value", source=test_data_2, legend=meas_name, line_color='blue', line_width=1.5)
line2 = plot2.line("ts", "meas_value", source=test_data_2, legend=meas_name, line_color='blue', line_width=1.5)
plot2.legend.location = "top_right"
In [ ]:
consumer = KafkaConsumer()
consumer.subscribe(['measurements', 'measurements_substituted'])
delay = .1
handle = show(column(plot1, plot2), notebook_handle=True)
In [ ]:
for message in consumer:
topic = message.topic
measurements = json.loads(message.value)
df_kafka = json_normalize(measurements)
df_kafka["meas_value"] = np.nan if "meas_value" not in measurements else measurements["meas_value"]
df_kafka = df_kafka.loc[df_kafka["meas_name"]==meas_name]
df_kafka = df_kafka[["ts", "meas_value"]]
df_kafka["ts"] = df_kafka["ts"].apply(lambda x: pd.to_datetime(x, unit='ms').tz_localize('UTC').tz_convert('US/Pacific'))
if (not df_kafka.empty):
if (topic == 'measurements'):
test_data_1.stream(df_kafka.to_dict(orient='list'))
if (topic == 'measurements_substituted'):
test_data_2.stream(df_kafka.to_dict(orient='list'))
push_notebook(handle=handle)
time.sleep(delay)
Another way to query and visualize processed data is using ec.query_measurements_cleansed(..) and ec.query_measurements_summary(..) functions. For our example, we specify paramaters that match sample data set, and query the aggregated values using ec.query_measurements_summary(..) function call.
In [ ]:
# QUERY MEASUREMENTS_CLEANSED TABLE
primary_key={"company": "EpiData", "site": "San_Jose", "station":"WSN-1",
"sensor": ["Temperature_Probe", "RH_Probe", "Anemometer"]}
start_time = datetime.strptime('8/19/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
stop_time = datetime.strptime('8/20/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
df_cleansed = ec.query_measurements_cleansed(primary_key, start_time, stop_time)
print "Number of records:", df_cleansed.count()
df_cleansed_local = df_cleansed.toPandas()
df_cleansed_local[df_cleansed_local["meas_name"]=="Temperature"].tail(10).sort_values(by="ts",ascending=False)
In [ ]:
# QUERY MEASUREMNTS_SUMMARY TABLE
primary_key={"company": "EpiData", "site": "San_Jose", "station":"WSN-1", "sensor": ["Temperature_Probe"]}
start_time = datetime.strptime('8/19/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
stop_time = datetime.strptime('8/20/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
last_index = -1
summary_result = pd.DataFrame()
df_summary = ec.query_measurements_summary(primary_key, start_time, stop_time)
df_summary_local = df_summary.toPandas()
summary_keys = df_summary_local[["company", "site", "station", "sensor", "start_time", "stop_time", "meas_name", "meas_summary_name"]]
summary_result = df_summary_local["meas_summary_value"].apply(json.loads).apply(pd.Series)
summary_combined = pd.concat([summary_keys, summary_result], axis=1)
summary_combined.tail(5)
The transformations can be stopped at any time via ec.stop_streaming() function call
In [ ]:
#Stop current near real-time processing
ec.stop_streaming()
Congratulations, you have successfully perfomed near real-time analytics on sample data aquired by a simulated wireless sensor network. The next step is to explore various capabilities of EpiData by creating your own custom analytics application!